package com.kaigejava.flink.chapter01;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
 * @author 凯哥Java
 * @description 使用flink统计文本中每个单词出现的次数-基于批处理思想（基于dataset的api）
 * 思路：
 * 1:解析文本
 * 2：获取每行数据
 * 3：对每行数据进行单词分组
 * 4：统计每组数据的个数
 *
 * @company
 * @since 2022/11/21 14:58
 */
public class BatchWordCount {


    /**
     * 步骤：
     * 1：创建执行环境
     * 2：从文件中读取数据，安行读取(存储的元素就是每行的文本)
     * 3：转换数据格式
     * 4：按照word进行分组
     * 5：分组内聚合统计
     * 6：打印结果
     * @param args
     */
    public static void main(String[] args) throws Exception {
       //1：创建执行环境-基于dataset的api
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //2:从文件中获取
        //获取不到words.txt解决办法：http://www.kaigejava.com/newuiarticle/detail/1170
        DataSource<String> lineDS = env.readTextFile("input\\words.txt");
        //3：格式转换 将每行数据进行分词，转换成二元组类型。如：（hello,1）。map
        FlatMapOperator<String, Tuple2<String,Long>> wordAndOneTuple = lineDS
                .flatMap((String line, Collector<Tuple2<String,Long>> out)-> {
                    String [] words = line.split(" ");
                    //获取每一个words,转换成二元组
                    for (String word : words) {
                        out.collect(Tuple2.of(word,1L));
                    }

                }).returns(Types.TUPLE(Types.STRING,Types.LONG));
        //4:按照word进行分组-索引进行分组
        UnsortedGrouping<Tuple2<String, Long>> wordAndOneTupleGrouping = wordAndOneTuple.groupBy(0);
        //5：分组内聚合统计
        AggregateOperator<Tuple2<String, Long>> sum = wordAndOneTupleGrouping.sum(1);
        //6：打印出结果
        sum.print();
    }
}
